-
Notifications
You must be signed in to change notification settings - Fork 0
MCP over...Matrix? #9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
📝 WalkthroughSummary by CodeRabbit
WalkthroughAdds Matrix-based Desktop MCP IPC: device registration and metadata, JSON‑RPC framing over m.room.message, an MCPMatrixClient with request/response lifecycle, tool discovery/caching/invocation, MCP relay identity and portal creation, IPC handlers for desktop_hello, MCP responses, and notifications. Changes
Sequence Diagram(s)sequenceDiagram
participant Desktop as Desktop App
participant Matrix as Matrix Server
participant Connector as Connector
participant MCP as MCPMatrixClient
participant AI as AIClient
Desktop->>Matrix: m.room.message (IPC: desktop_hello)
Matrix->>Connector: deliver IPC m.room.message
Connector->>Connector: parse IPCContent, validate device_id
Connector->>Connector: update UserLoginMetadata (DesktopDevices), set preferred if absent
Connector->>MCP: async Initialize(roomID, deviceID)
MCP->>Matrix: m.room.message (mcp_request: initialize)
Matrix->>Desktop: forward initialize request
Desktop->>Matrix: m.room.message (mcp_response)
Matrix->>Connector: deliver m.room.message (mcp_response)
Connector->>MCP: HandleResponse -> match pending request, deliver result
MCP->>MCP: cache/list tools (on init or notification)
AI->>MCP: CallTool(roomID, deviceID, toolName, args)
MCP->>Matrix: m.room.message (mcp_request: resources/call)
Matrix->>Desktop: forward tool call
Desktop->>Matrix: m.room.message (mcp_response: tool result)
Matrix->>Connector: deliver m.room.message (mcp_response)
Connector->>MCP: HandleResponse -> parse result -> return to AI
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ❌ 3❌ Failed checks (2 warnings, 1 inconclusive)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
pkg/connector/chat.go (1)
122-177:⚠️ Potential issue | 🟡 MinorAvoid duplicate tool names when merging MCP tools.
If a Desktop MCP tool shares a name with a builtin tool, the UI will show duplicates and toggles become ambiguous. Consider de-duplicating (or prefixing) MCP entries.🧩 Example de-dup approach
- var tools []ToolInfo + var tools []ToolInfo + seen := make(map[string]struct{}) // Add builtin tools from config for name, entry := range meta.ToolsConfig.Tools { if entry == nil { continue } @@ tools = append(tools, ToolInfo{ Name: name, @@ }) + seen[name] = struct{}{} } // Add Desktop MCP tools if available if oc.mcpClient != nil && oc.mcpClient.HasDesktopTools() { mcpTools := oc.mcpClient.GetDesktopTools() for _, mcpTool := range mcpTools { + if _, exists := seen[mcpTool.Name]; exists { + continue + } tools = append(tools, ToolInfo{ Name: mcpTool.Name, @@ }) + seen[mcpTool.Name] = struct{}{} } }pkg/connector/metadata.go (1)
3-9:⚠️ Potential issue | 🟠 Majorgoimports hook is failing on this file.
CI reports a formatting change; please re-run goimports on this file to satisfy the hook.
🤖 Fix all issues with AI agents
In `@pkg/connector/commands.go`:
- Around line 533-548: The lint S1039 warning is caused by using fmt.Sprintf for
a constant string in the final sb.WriteString call; update the code in the block
that builds the output (using the strings.Builder variable sb) to call
sb.WriteString with the literal "\nUse `!ai desktop prefer <device_id>` to set
the preferred device" instead of sb.WriteString(fmt.Sprintf(...)); leave other
fmt.Sprintf uses (which interpolate variables like device.DeviceName/DeviceID
and online/toolCount) unchanged.
In `@pkg/connector/connector.go`:
- Around line 436-453: The Desktop Hello handler currently always uses
user.GetDefaultLogin() so devices get registered to the default AI account; to
support multi-login you must either include a login/account identifier in the
Desktop Hello event and use that to select the correct login (e.g., look up
user.GetLoginByID(...) or equivalent) when registering the device, or explicitly
document that single-login-per-user is required. Update the Desktop Hello
parsing/registration code to mirror the mapping logic used in
getLoginForPortal() (use the portal/receiver->login mapping pattern) and ensure
the selected login is type-asserted to *AIClient (as currently done) before
proceeding; if you choose the event-change route, also update event producers to
include the login id and fallback to GetDefaultLogin() only when the identifier
is absent.
In `@pkg/connector/mcp_client.go`:
- Around line 190-233: The loop in ListTools uses an unsafe assertion
toolMap["name"].(string) which can panic for malformed MCP responses; update the
loop in ListTools to safely type-check the "name" entry on toolMap (e.g., v, ok
:= toolMap["name"].(string)) and if it's missing or not a string, skip that
entry (continue) or return a clear error; ensure MCPTool.Name is only set when
the check passes and keep the existing safe checks for "description" and
"inputSchema".
- Around line 283-322: GetPreferredDevice, HasDesktopTools and GetDesktopTools
access MCPMatrixClient.DesktopDevices and PreferredDesktopDeviceID without
synchronization while handleDesktopHelloEvent and command handlers mutate them;
add a sync.RWMutex (e.g. devicesMu) to the MCPMatrixClient struct, use
devicesMu.RLock()/RUnlock() in GetPreferredDevice, HasDesktopTools and
GetDesktopTools and devicesMu.Lock()/Unlock() in handleDesktopHelloEvent and all
command handlers that modify DesktopDevices or PreferredDesktopDeviceID, and
when returning device.Tools from GetDesktopTools return a copied slice to avoid
sharing mutable memory.
| // Find the user login for the sender | ||
| user, err := oc.br.GetUserByMXID(ctx, evt.Sender) | ||
| if err != nil || user == nil { | ||
| log.Warn().Err(err).Msg("Failed to get user for desktop hello") | ||
| return | ||
| } | ||
|
|
||
| login := user.GetDefaultLogin() | ||
| if login == nil { | ||
| log.Warn().Msg("User has no active login") | ||
| return | ||
| } | ||
|
|
||
| client, ok := login.Client.(*AIClient) | ||
| if !ok || client == nil { | ||
| log.Warn().Msg("Invalid client type for user login") | ||
| return | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
git ls-files | head -20Repository: beeper/ai-bridge
Length of output: 430
🏁 Script executed:
# Find the connector.go file and check its size
find . -name "connector.go" -type fRepository: beeper/ai-bridge
Length of output: 88
🏁 Script executed:
# Get context around the mentioned lines
sed -n '430,460p' pkg/connector/connector.goRepository: beeper/ai-bridge
Length of output: 797
🏁 Script executed:
# Search for GetDefaultLogin usage to understand the pattern
rg "GetDefaultLogin" -A 3 -B 3Repository: beeper/ai-bridge
Length of output: 2732
🏁 Script executed:
# Search for multi-login or multiple login related code
rg -i "multi.*login|multiple.*login" -A 2 -B 2Repository: beeper/ai-bridge
Length of output: 529
🏁 Script executed:
# Look for device ID handling in Desktop/MCP context
rg -i "device.*id|deviceid" -A 2 -B 2 | head -50Repository: beeper/ai-bridge
Length of output: 2702
🏁 Script executed:
# Check for Desktop MCP event handling patterns
rg -i "desktop.*mcp|mcp.*desktop" -A 3 -B 3Repository: beeper/ai-bridge
Length of output: 9961
🏁 Script executed:
# Search for Client struct and login-related fields to understand the architecture
ast-grep --pattern $'type $_ struct {
$$$
Client $_
$$$
}'Repository: beeper/ai-bridge
Length of output: 290
🏁 Script executed:
# Get the full handleDesktopHelloEvent function
rg "handleDesktopHelloEvent" -A 40Repository: beeper/ai-bridge
Length of output: 5199
🏁 Script executed:
# Look for DesktopHelloContent or event content structure
rg "DesktopHelloContent|type.*Hello" -B 2 -A 5Repository: beeper/ai-bridge
Length of output: 1138
🏁 Script executed:
# Check how devices are initially registered and if there's any login mapping
rg "DesktopDeviceInfo|DesktopDevices\[" -B 3 -A 3Repository: beeper/ai-bridge
Length of output: 4185
🏁 Script executed:
# Look for any device_id in the Desktop Hello event processing
rg "device.*id|deviceID" -i -B 5 -A 5 | grep -A 10 -B 10 "hello\|Hello"Repository: beeper/ai-bridge
Length of output: 2650
🏁 Script executed:
# Check if there's any device-to-login mapping based on room ID or other identifiers
rg "handleDesktopHelloEvent" -A 60 | head -80Repository: beeper/ai-bridge
Length of output: 5196
🏁 Script executed:
# Look for any discussion or TODO comments about multi-login device mapping
rg -i "todo|fixme|xxx|hack" | grep -i "device\|login\|desktop" | head -20Repository: beeper/ai-bridge
Length of output: 42
🏁 Script executed:
# Check if there's any lookup mechanism to find the correct login for a device
rg "GetUserLogins|logins\|" -B 2 -A 2Repository: beeper/ai-bridge
Length of output: 451
🏁 Script executed:
# Get more context on the full handleDesktopHelloEvent implementation
sed -n '430,480p' pkg/connector/connector.goRepository: beeper/ai-bridge
Length of output: 1319
🏁 Script executed:
# Check if there's any way to determine which login owns a device based on other signals
rg "handleMCPResponseEvent" -A 30 | head -50Repository: beeper/ai-bridge
Length of output: 3322
Consider mapping Desktop devices to the correct login when users have multiple AI accounts.
The Desktop Hello event (line 443) uses user.GetDefaultLogin() to register the device. Multi-login is supported in this codebase, but the Desktop Hello event content contains only device_id, device_name, and app_version—no login or account identifier. This means a device will always be registered to the default login, not necessarily the intended account.
The codebase already demonstrates the proper pattern in getLoginForPortal(), which maps to the correct login via the portal's receiver. Consider either:
- Adding a login/account identifier to the Desktop Hello event so the device associates with the correct account, or
- Documenting that single-login-per-user is the expected deployment model
🤖 Prompt for AI Agents
In `@pkg/connector/connector.go` around lines 436 - 453, The Desktop Hello handler
currently always uses user.GetDefaultLogin() so devices get registered to the
default AI account; to support multi-login you must either include a
login/account identifier in the Desktop Hello event and use that to select the
correct login (e.g., look up user.GetLoginByID(...) or equivalent) when
registering the device, or explicitly document that single-login-per-user is
required. Update the Desktop Hello parsing/registration code to mirror the
mapping logic used in getLoginForPortal() (use the portal/receiver->login
mapping pattern) and ensure the selected login is type-asserted to *AIClient (as
currently done) before proceeding; if you choose the event-change route, also
update event producers to include the login id and fallback to GetDefaultLogin()
only when the identifier is absent.
| // ListTools requests the list of available tools from a device | ||
| func (c *MCPMatrixClient) ListTools(ctx context.Context, roomID id.RoomID, deviceID string) ([]MCPTool, error) { | ||
| resp, err := c.Call(ctx, roomID, deviceID, "tools/list", nil) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("tools/list failed: %w", err) | ||
| } | ||
|
|
||
| if resp.Error != nil { | ||
| return nil, fmt.Errorf("tools/list error: %s", resp.Error.Message) | ||
| } | ||
|
|
||
| // Parse tools from result | ||
| result, ok := resp.Result.(map[string]any) | ||
| if !ok { | ||
| return nil, fmt.Errorf("invalid tools/list result type") | ||
| } | ||
|
|
||
| toolsRaw, ok := result["tools"].([]any) | ||
| if !ok { | ||
| return nil, fmt.Errorf("invalid tools field in result") | ||
| } | ||
|
|
||
| var tools []MCPTool | ||
| for _, t := range toolsRaw { | ||
| toolMap, ok := t.(map[string]any) | ||
| if !ok { | ||
| continue | ||
| } | ||
|
|
||
| tool := MCPTool{ | ||
| Name: toolMap["name"].(string), | ||
| } | ||
| if desc, ok := toolMap["description"].(string); ok { | ||
| tool.Description = desc | ||
| } | ||
| if schema, ok := toolMap["inputSchema"].(map[string]any); ok { | ||
| tool.InputSchema = schema | ||
| } | ||
|
|
||
| tools = append(tools, tool) | ||
| } | ||
|
|
||
| return tools, nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's verify the file exists and read the relevant section
cat -n pkg/connector/mcp_client.go | head -250 | tail -100Repository: beeper/ai-bridge
Length of output: 3458
🏁 Script executed:
# Get more context around lines 190-233
sed -n '180,240p' pkg/connector/mcp_client.goRepository: beeper/ai-bridge
Length of output: 1686
🏁 Script executed:
# Search for MCPTool definition
rg -A 10 "type MCPTool struct" pkg/connector/Repository: beeper/ai-bridge
Length of output: 363
🏁 Script executed:
# Check for similar type assertions in the file
rg -n "\.\(string\)|\.\(map\[string\]any\)" pkg/connector/mcp_client.go | head -20Repository: beeper/ai-bridge
Length of output: 504
Add safe type check for "name" field to prevent panics from malformed MCP responses.
Line 220 uses unsafe type assertion toolMap["name"].(string) without validation. While optional fields like description and inputSchema are safely checked, the required name field is missing protection. Since MCP responses come from external devices, this can panic if the field is missing or has wrong type. Add a type check before accessing:
Suggested fix
- tool := MCPTool{
- Name: toolMap["name"].(string),
- }
+ name, ok := toolMap["name"].(string)
+ if !ok || name == "" {
+ continue
+ }
+ tool := MCPTool{
+ Name: name,
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // ListTools requests the list of available tools from a device | |
| func (c *MCPMatrixClient) ListTools(ctx context.Context, roomID id.RoomID, deviceID string) ([]MCPTool, error) { | |
| resp, err := c.Call(ctx, roomID, deviceID, "tools/list", nil) | |
| if err != nil { | |
| return nil, fmt.Errorf("tools/list failed: %w", err) | |
| } | |
| if resp.Error != nil { | |
| return nil, fmt.Errorf("tools/list error: %s", resp.Error.Message) | |
| } | |
| // Parse tools from result | |
| result, ok := resp.Result.(map[string]any) | |
| if !ok { | |
| return nil, fmt.Errorf("invalid tools/list result type") | |
| } | |
| toolsRaw, ok := result["tools"].([]any) | |
| if !ok { | |
| return nil, fmt.Errorf("invalid tools field in result") | |
| } | |
| var tools []MCPTool | |
| for _, t := range toolsRaw { | |
| toolMap, ok := t.(map[string]any) | |
| if !ok { | |
| continue | |
| } | |
| tool := MCPTool{ | |
| Name: toolMap["name"].(string), | |
| } | |
| if desc, ok := toolMap["description"].(string); ok { | |
| tool.Description = desc | |
| } | |
| if schema, ok := toolMap["inputSchema"].(map[string]any); ok { | |
| tool.InputSchema = schema | |
| } | |
| tools = append(tools, tool) | |
| } | |
| return tools, nil | |
| } | |
| // ListTools requests the list of available tools from a device | |
| func (c *MCPMatrixClient) ListTools(ctx context.Context, roomID id.RoomID, deviceID string) ([]MCPTool, error) { | |
| resp, err := c.Call(ctx, roomID, deviceID, "tools/list", nil) | |
| if err != nil { | |
| return nil, fmt.Errorf("tools/list failed: %w", err) | |
| } | |
| if resp.Error != nil { | |
| return nil, fmt.Errorf("tools/list error: %s", resp.Error.Message) | |
| } | |
| // Parse tools from result | |
| result, ok := resp.Result.(map[string]any) | |
| if !ok { | |
| return nil, fmt.Errorf("invalid tools/list result type") | |
| } | |
| toolsRaw, ok := result["tools"].([]any) | |
| if !ok { | |
| return nil, fmt.Errorf("invalid tools field in result") | |
| } | |
| var tools []MCPTool | |
| for _, t := range toolsRaw { | |
| toolMap, ok := t.(map[string]any) | |
| if !ok { | |
| continue | |
| } | |
| name, ok := toolMap["name"].(string) | |
| if !ok || name == "" { | |
| continue | |
| } | |
| tool := MCPTool{ | |
| Name: name, | |
| } | |
| if desc, ok := toolMap["description"].(string); ok { | |
| tool.Description = desc | |
| } | |
| if schema, ok := toolMap["inputSchema"].(map[string]any); ok { | |
| tool.InputSchema = schema | |
| } | |
| tools = append(tools, tool) | |
| } | |
| return tools, nil | |
| } |
🤖 Prompt for AI Agents
In `@pkg/connector/mcp_client.go` around lines 190 - 233, The loop in ListTools
uses an unsafe assertion toolMap["name"].(string) which can panic for malformed
MCP responses; update the loop in ListTools to safely type-check the "name"
entry on toolMap (e.g., v, ok := toolMap["name"].(string)) and if it's missing
or not a string, skip that entry (continue) or return a clear error; ensure
MCPTool.Name is only set when the check passes and keep the existing safe checks
for "description" and "inputSchema".
| // GetPreferredDevice returns the preferred desktop device for a user | ||
| // Falls back to the most recently seen device if no preference is set | ||
| func (c *MCPMatrixClient) GetPreferredDevice() *DesktopDeviceInfo { | ||
| meta := loginMetadata(c.client.UserLogin) | ||
| if len(meta.DesktopDevices) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| // Check preferred device first | ||
| if meta.PreferredDesktopDeviceID != "" { | ||
| if device, ok := meta.DesktopDevices[meta.PreferredDesktopDeviceID]; ok { | ||
| return device | ||
| } | ||
| } | ||
|
|
||
| // Fall back to most recently seen device | ||
| var latestDevice *DesktopDeviceInfo | ||
| for _, device := range meta.DesktopDevices { | ||
| if latestDevice == nil || device.LastSeen > latestDevice.LastSeen { | ||
| latestDevice = device | ||
| } | ||
| } | ||
|
|
||
| return latestDevice | ||
| } | ||
|
|
||
| // HasDesktopTools returns true if there are any MCP tools available from connected desktops | ||
| func (c *MCPMatrixClient) HasDesktopTools() bool { | ||
| device := c.GetPreferredDevice() | ||
| return device != nil && len(device.Tools) > 0 | ||
| } | ||
|
|
||
| // GetDesktopTools returns the cached MCP tools from the preferred device | ||
| func (c *MCPMatrixClient) GetDesktopTools() []MCPTool { | ||
| device := c.GetPreferredDevice() | ||
| if device == nil { | ||
| return nil | ||
| } | ||
| return device.Tools | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
rg -n "DesktopDevices|PreferredDesktopDeviceID" -g '*.go' -B 2 -A 2Repository: beeper/ai-bridge
Length of output: 4879
🏁 Script executed:
rg -n "type MCPMatrixClient\|type loginMetadata\|type DesktopDeviceInfo" -g '*.go' -A 10Repository: beeper/ai-bridge
Length of output: 42
🏁 Script executed:
rg -n "go\s+\(|go\s+func|\\.Lock\\(|\\.RLock\\(|sync\\.Mutex|sync\\.RWMutex" -g '*.go' | head -50Repository: beeper/ai-bridge
Length of output: 1164
🏁 Script executed:
rg -n "func loginMetadata" -g '*.go' -A 5Repository: beeper/ai-bridge
Length of output: 443
🏁 Script executed:
rg -n "func (c \*MCPMatrixClient) GetPreferredDevice\|func (c \*MCPMatrixClient) HasDesktopTools\|func (c \*MCPMatrixClient) GetDesktopTools" -g '*.go' -A 15Repository: beeper/ai-bridge
Length of output: 42
🏁 Script executed:
# Check UserLogin struct and Save method
rg -n "type UserLogin\|func.*Save" pkg/connector/client.go -B 2 -A 3 | head -60Repository: beeper/ai-bridge
Length of output: 42
🏁 Script executed:
rg -n "type UserLogin" -g '*.go' -A 10 | head -40Repository: beeper/ai-bridge
Length of output: 1204
🏁 Script executed:
# Check if there's synchronization around UserLogin updates
rg -n "UserLogin\\.Metadata\|UserLogin\\.Save" -g '*.go' -B 2 -A 2 | head -80Repository: beeper/ai-bridge
Length of output: 42
🏁 Script executed:
# Check the handleDesktopHello function where writes occur
rg -n "handleDesktopHello\|func.*Desktop.*Hello" -g '*.go' -A 25Repository: beeper/ai-bridge
Length of output: 42
🏁 Script executed:
# Find handleDesktopHello and the context where DesktopDevices is modified
rg -n "handleDesktopHello|DesktopDevices\s*\[" -g '*.go' -B 3 -A 8Repository: beeper/ai-bridge
Length of output: 5960
🏁 Script executed:
# Check the complete context of connector.go around line 457 where write occurs
sed -n '450,480p' pkg/connector/connector.goRepository: beeper/ai-bridge
Length of output: 829
🏁 Script executed:
# Check the entire GetPreferredDevice implementation for mutex usage
sed -n '283,310p' pkg/connector/mcp_client.goRepository: beeper/ai-bridge
Length of output: 937
🏁 Script executed:
# Check if there's any synchronization around the entire handleDesktopHelloEvent function
sed -n '407,490p' pkg/connector/connector.goRepository: beeper/ai-bridge
Length of output: 2404
🏁 Script executed:
# Check where handleDesktopHelloEvent is called and if it's in a goroutine
rg -n "handleDesktopHelloEvent\|EventProcessor" pkg/connector/connector.go -B 2 -A 2Repository: beeper/ai-bridge
Length of output: 42
🏁 Script executed:
# Check if UserLogin.Save has any locking semantics
rg -n "func.*Save\(" -g '*.go' -A 3 | head -40Repository: beeper/ai-bridge
Length of output: 42
🏁 Script executed:
# Check if there's broader synchronization around login state or UserLogin
rg -n "type.*Client\s{|type AIClient\s{" -g '*.go' -A 20 | head -80Repository: beeper/ai-bridge
Length of output: 189
🏁 Script executed:
# Look at where GetPreferredDevice is actually called
rg -n "GetPreferredDevice\|HasDesktopTools\|GetDesktopTools" -g '*.go' -B 2 -A 2Repository: beeper/ai-bridge
Length of output: 42
🏁 Script executed:
# Check the goroutine pattern - are reads from GetPreferredDevice happening in separate goroutines?
rg -n "go.*GetPreferredDevice\|go.*HasDesktopTools\|go.*GetDesktopTools\|go func" -g '*.go' -B 2 -A 4 | head -100Repository: beeper/ai-bridge
Length of output: 42
🏁 Script executed:
# Find where GetPreferredDevice/HasDesktopTools/GetDesktopTools are actually called from
rg -n "GetPreferredDevice\|HasDesktopTools\|GetDesktopTools" pkg/connector/ -B 3 -A 1Repository: beeper/ai-bridge
Length of output: 42
🏁 Script executed:
# Check the AIClient and MCPMatrixClient struct definitions to understand synchronization
sed -n '1,250p' pkg/connector/client.go | grep -A 30 "type AIClient struct"Repository: beeper/ai-bridge
Length of output: 940
🏁 Script executed:
# Check if there's synchronization in mcp_client.go around the mutex
sed -n '1,100p' pkg/connector/mcp_client.goRepository: beeper/ai-bridge
Length of output: 2485
🏁 Script executed:
# Final verification: check if there are any test files that might show intended concurrency patterns
rg -n "GetPreferredDevice\|HasDesktopTools\|GetDesktopTools" pkg/connector/test* -g '*.go' 2>/dev/null || echo "No concurrent test patterns found"Repository: beeper/ai-bridge
Length of output: 93
🏁 Script executed:
# Verify if there's any channel-based synchronization or copy pattern elsewhere
rg -n "copy.*DesktopDevices\|DesktopDevices.*=" pkg/connector/ -g '*.go'Repository: beeper/ai-bridge
Length of output: 42
Add synchronization to protect concurrent access to DesktopDevices and PreferredDesktopDeviceID.
GetPreferredDevice() reads DesktopDevices (including iteration at line 300) while handleDesktopHelloEvent() writes to the same map without any synchronization. Go maps panic on concurrent read/write, and this race condition exists across multiple access points: GetPreferredDevice()/GetDesktopTools() in mcp_client.go, writes from handleDesktopHelloEvent() in connector.go, and writes from command handlers in commands.go. Protect these fields with a mutex or use a copy-on-read pattern.
🤖 Prompt for AI Agents
In `@pkg/connector/mcp_client.go` around lines 283 - 322, GetPreferredDevice,
HasDesktopTools and GetDesktopTools access MCPMatrixClient.DesktopDevices and
PreferredDesktopDeviceID without synchronization while handleDesktopHelloEvent
and command handlers mutate them; add a sync.RWMutex (e.g. devicesMu) to the
MCPMatrixClient struct, use devicesMu.RLock()/RUnlock() in GetPreferredDevice,
HasDesktopTools and GetDesktopTools and devicesMu.Lock()/Unlock() in
handleDesktopHelloEvent and all command handlers that modify DesktopDevices or
PreferredDesktopDeviceID, and when returning device.Tools from GetDesktopTools
return a copied slice to avoid sharing mutable memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@pkg/connector/metadata.go`:
- Around line 94-103: DesktopDeviceInfo.Online is only set true in
handleDesktopHello and never cleared; add staleness-based presence handling: in
handleDesktopHello continue to set Online = true and update LastSeen; implement
a background goroutine (e.g., runDeviceStalenessChecker) that periodically scans
the device registry/map and sets DesktopDeviceInfo.Online = false when
time.Now().Unix()-LastSeen exceeds a configurable timeout (e.g.,
deviceOfflineTimeout) and persist that change; optionally expose the timeout and
stop control so tests can override it. Use the unique symbols DesktopDeviceInfo,
LastSeen, Online, handleDesktopHello, and the device registry/map to locate
where to update state and where to add the staleness checker.
🧹 Nitpick comments (3)
pkg/connector/events.go (1)
744-762:MCPPayloadandMCPJSONRPCBaseare identical - consider consolidating.Both structs have the exact same fields and JSON tags. This duplication may cause confusion and maintenance overhead.
♻️ Consider using a single type or type alias
-// MCPPayload is the payload for MCP request/response/notification IPC messages -type MCPPayload struct { - JSONRPC string `json:"jsonrpc"` - ID any `json:"id,omitempty"` // string, number, or null - Method string `json:"method,omitempty"` - Params map[string]any `json:"params,omitempty"` - Result any `json:"result,omitempty"` - Error *MCPRPCError `json:"error,omitempty"` -} - -// MCPJSONRPCBase represents a generic JSON-RPC 2.0 message -type MCPJSONRPCBase struct { - JSONRPC string `json:"jsonrpc"` - ID any `json:"id,omitempty"` // string, number, or null - Method string `json:"method,omitempty"` - Params map[string]any `json:"params,omitempty"` - Result any `json:"result,omitempty"` - Error *MCPRPCError `json:"error,omitempty"` -} +// MCPJSONRPCBase represents a generic JSON-RPC 2.0 message +// Used for MCP request/response/notification IPC payloads +type MCPJSONRPCBase struct { + JSONRPC string `json:"jsonrpc"` + ID any `json:"id,omitempty"` // string, number, or null + Method string `json:"method,omitempty"` + Params map[string]any `json:"params,omitempty"` + Result any `json:"result,omitempty"` + Error *MCPRPCError `json:"error,omitempty"` +} + +// MCPPayload is an alias for MCPJSONRPCBase for IPC message payloads +type MCPPayload = MCPJSONRPCBasepkg/connector/connector.go (2)
529-538: Goroutine usescontext.Background()- consider propagating cancellation.The MCP initialization goroutine creates a new context with
context.Background(). If the bridge is shutting down, this goroutine won't be notified. Consider usingc.client.backgroundContext()pattern used elsewhere in the codebase, or pass the background context from the bridge.♻️ Use backgroundContext pattern
// Initialize MCP connection asynchronously go func() { - initCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + bgCtx := client.backgroundContext(ctx) + initCtx, cancel := context.WithTimeout(bgCtx, 60*time.Second) defer cancel()
610-618: Same pattern - goroutine usescontext.Background()instead of bridge background context.♻️ Use backgroundContext pattern
// Refresh tools asynchronously go func() { - refreshCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + bgCtx := client.backgroundContext(ctx) + refreshCtx, cancel := context.WithTimeout(bgCtx, 30*time.Second) defer cancel()
| // DesktopDeviceInfo stores information about a connected Beeper Desktop device | ||
| type DesktopDeviceInfo struct { | ||
| DeviceID string `json:"device_id"` | ||
| DeviceName string `json:"device_name,omitempty"` | ||
| RoomID id.RoomID `json:"room_id"` | ||
| LastSeen int64 `json:"last_seen"` // Unix timestamp | ||
| AppVersion string `json:"app_version,omitempty"` | ||
| Tools []MCPTool `json:"tools,omitempty"` // Cached tools from this device | ||
| Online bool `json:"online,omitempty"` // Whether device appears to be online | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for where Online is set to false or updated based on device status
rg -n "Online\s*=\s*false|\.Online\s*=" --type goRepository: beeper/ai-bridge
Length of output: 42
🏁 Script executed:
#!/bin/bash
# Search for handleDesktopHello and related device management
rg -n "handleDesktopHello|DesktopDeviceInfo" --type go -A 3 -B 1Repository: beeper/ai-bridge
Length of output: 2961
🏁 Script executed:
#!/bin/bash
# Search for device removal, timeout, or offline detection logic
rg -n "offline|presence|heartbeat|timeout.*device|remove.*device|device.*leave" --type go -iRepository: beeper/ai-bridge
Length of output: 112
🏁 Script executed:
#!/bin/bash
# Get the full handleDesktopHello function implementation
sed -n '466,530p' pkg/connector/connector.goRepository: beeper/ai-bridge
Length of output: 1797
🏁 Script executed:
#!/bin/bash
# Search for all IPC message types and handlers
rg -n "IPC[A-Za-z]*\s*=" --type go | head -20Repository: beeper/ai-bridge
Length of output: 450
🏁 Script executed:
#!/bin/bash
# Search for device cleanup, removal, or leave handlers
rg -n "func.*handle|IPCDesktop|device.*gone|device.*offline" --type go -i | head -30Repository: beeper/ai-bridge
Length of output: 3127
Add mechanism to detect and update offline device status.
DesktopDeviceInfo.Online is set to true in handleDesktopHello, but there's no mechanism to set it to false or detect device disconnections. No offline message type exists in the IPC protocol (only desktop_hello, mcp_request, mcp_response, mcp_notification), and no cleanup or presence-based staleness detection is implemented. The field will remain true indefinitely for disconnected devices.
🤖 Prompt for AI Agents
In `@pkg/connector/metadata.go` around lines 94 - 103, DesktopDeviceInfo.Online is
only set true in handleDesktopHello and never cleared; add staleness-based
presence handling: in handleDesktopHello continue to set Online = true and
update LastSeen; implement a background goroutine (e.g.,
runDeviceStalenessChecker) that periodically scans the device registry/map and
sets DesktopDeviceInfo.Online = false when time.Now().Unix()-LastSeen exceeds a
configurable timeout (e.g., deviceOfflineTimeout) and persist that change;
optionally expose the timeout and stop control so tests can override it. Use the
unique symbols DesktopDeviceInfo, LastSeen, Online, handleDesktopHello, and the
device registry/map to locate where to update state and where to add the
staleness checker.
|
@coderabbitai re-check everything and resolve problems, for remaning ones give ma a full list of prompts for ai agents to fix |
|
Tip For best results, initiate chat on the files or code changes. 🧩 Analysis chain
🏁 Script executed: Length of output: 262963
✅ Issues I Can Help Resolve1. Critical: Context Misuse in Async OperationsFile: Problem: Lines 342-346 and 393-398 use // Line 342 - WRONG
go func() {
initCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
// ...
}()Fix: go func() {
// Use bridge's background context as parent
initCtx, cancel := context.WithTimeout(oc.br.BackgroundCtx, 60*time.Second)
defer cancel()
if err := client.initializeMCPConnection(initCtx, ipc.DeviceID); err != nil {
log.Err(err).Msg("Failed to initialize MCP connection")
}
}()Apply same fix to line 393-398. 2. Critical: Goroutine Leak in MCP ClientFile: Problem: Lines 102-106 use // WRONG - time.After leaks
select {
case resp := <-respChan:
return resp, nil
case <-time.After(timeout):
return nil, fmt.Errorf("MCP request timed out after %v", timeout)
case <-ctx.Done():
return nil, ctx.Err()
}Fix: timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case resp := <-respChan:
return resp, nil
case <-timer.C:
return nil, fmt.Errorf("MCP request timed out after %v", timeout)
case <-ctx.Done():
return nil, ctx.Err()
}3. High: Race Condition on DesktopDevices MapFile: Problem: Lines 322-333 access Fix: Add mutex to UserLoginMetadata: // In metadata.go
type UserLoginMetadata struct {
// ... existing fields ...
DesktopDevices map[string]*DesktopDeviceInfo `json:"desktop_devices,omitempty"`
deviceMu sync.RWMutex `json:"-"` // Protects DesktopDevices
}Then in connector.go: // Line 322
meta.deviceMu.Lock()
defer meta.deviceMu.Unlock()
if meta.DesktopDevices == nil {
meta.DesktopDevices = make(map[string]*DesktopDeviceInfo)
}
meta.DesktopDevices[ipc.DeviceID] = &DesktopDeviceInfo{
// ...
}4. Medium: Missing Input ValidationFile: Problem: No validation of deviceID, roomID before operations. Fix: Add validation at start of key methods: func (c *MCPMatrixClient) Call(ctx context.Context, roomID id.RoomID, deviceID string, method string, params map[string]any) (*MCPJSONRPCBase, error) {
if roomID == "" {
return nil, fmt.Errorf("roomID is required")
}
if deviceID == "" {
return nil, fmt.Errorf("deviceID is required")
}
if method == "" {
return nil, fmt.Errorf("method is required")
}
// ... rest of implementation
}5. Medium: No Cleanup of Stale DevicesFile: Problem: Devices marked as Online but never cleaned up if Desktop disconnects ungracefully. Fix: Add periodic cleanup in connector.go: func (oc *OpenAIConnector) Start(ctx context.Context) error {
// ... existing code ...
// Start device cleanup goroutine
go oc.cleanupStaleDevices(ctx)
return nil
}
func (oc *OpenAIConnector) cleanupStaleDevices(ctx context.Context) {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
oc.markStaleDevicesOffline()
}
}
}
func (oc *OpenAIConnector) markStaleDevicesOffline() {
// Iterate all logins and mark devices offline if LastSeen > 5 minutes
// Implementation needed
}📋 Prompts for AI Agents (Remaining Issues)Prompt 1: Add Comprehensive Error Handling to IPC HandlersPrompt 2: Add Unit Tests for MCP ClientPrompt 3: Add Input Sanitization for MCP PayloadsPrompt 4: Improve Documentation and Error MessagesPrompt 5: Add Device Connection State MachinePrompt 6: Add Rate Limiting for MCP CallsPrompt 7: Add Integration Tests for Full MCP Flow📊 SummaryCritical Issues: 3 (context misuse, goroutine leak, race condition) The critical and high-priority issues should be fixed before merging. The remaining issues can be addressed in follow-up PRs using the provided AI agent prompts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@pkg/connector/commands.go`:
- Around line 526-551: This code reads loginMeta.DesktopDevices and
loginMeta.PreferredDesktopDeviceID without synchronization; once you add a mutex
to UserLoginMetadata, acquire the appropriate lock (e.g., RLock/RUnlock for
read-only access or Lock/Unlock if you prefer) on loginMeta before any reads in
the "!ai desktop list" branch (the block that checks len(ce.Args) == 0 ||
ce.Args[0] == "list") and release it after building the response so concurrent
writers such as handleDesktopHello cannot mutate DesktopDevices mid-read; ensure
the lock scope covers all uses of loginMeta.DesktopDevices,
loginMeta.PreferredDesktopDeviceID, and related fields in that block.
- Around line 610-617: The goroutine uses the request-scoped context ce.Ctx
which may be cancelled when the handler returns; update the async work to use a
detached/background context (e.g., context.Background() or the connector/bridge
long-lived context if available) so initializeMCPConnection and sendSystemNotice
run reliably; modify the anonymous goroutine to create/use that background
context instead of ce.Ctx and pass the same detached ctx into
client.initializeMCPConnection(...) and client.sendSystemNotice(...), while
still using device.DeviceID and ce.Portal as before.
🧹 Nitpick comments (2)
pkg/connector/handleai.go (1)
1019-1025: Consider extracting MCP tool addition to reduce duplication.This logic is duplicated from
buildResponsesAPIParams(lines 296-301). Additionally, the debug log present there is missing here.♻️ Optional: Extract helper to reduce duplication
// getMCPToolParams returns OpenAI tool params for MCP tools, or nil if none available func (oc *AIClient) getMCPToolParams() []responses.ToolUnionParam { if oc.mcpClient == nil || !oc.mcpClient.HasDesktopTools() { return nil } mcpTools := oc.mcpClient.GetDesktopTools() return ToOpenAITools(MCPToolsToDefinitions(mcpTools)) }Then in both functions:
if meta.Capabilities.SupportsToolCalling { // ... builtin tools ... - if oc.mcpClient != nil && oc.mcpClient.HasDesktopTools() { - mcpTools := oc.mcpClient.GetDesktopTools() - mcpDefs := MCPToolsToDefinitions(mcpTools) - params.Tools = append(params.Tools, ToOpenAITools(mcpDefs)...) - } + if mcpTools := oc.getMCPToolParams(); len(mcpTools) > 0 { + params.Tools = append(params.Tools, mcpTools...) + log.Debug().Int("count", len(mcpTools)).Msg("Added MCP tools from Desktop") + } }pkg/connector/commands.go (1)
594-600: Non-deterministic device selection from map iteration.Go map iteration order is undefined, so when no preferred device is set, a different device may be selected on each invocation. Consider sorting by device ID or selecting the most recently connected device for consistent behavior.
| if len(ce.Args) == 0 || ce.Args[0] == "list" { | ||
| // List connected devices | ||
| if len(loginMeta.DesktopDevices) == 0 { | ||
| ce.Reply("No Desktop devices connected. Open the AI bridge account pane in Beeper Desktop and click \"Connect Desktop\" to link a device.") | ||
| return | ||
| } | ||
|
|
||
| var sb strings.Builder | ||
| sb.WriteString("Connected Desktop devices:\n\n") | ||
| for _, device := range loginMeta.DesktopDevices { | ||
| preferred := "" | ||
| if device.DeviceID == loginMeta.PreferredDesktopDeviceID { | ||
| preferred = " (preferred)" | ||
| } | ||
| online := "offline" | ||
| if device.Online { | ||
| online = "online" | ||
| } | ||
| toolCount := len(device.Tools) | ||
| sb.WriteString(fmt.Sprintf("• **%s** (`%s`)%s\n", device.DeviceName, device.DeviceID, preferred)) | ||
| sb.WriteString(fmt.Sprintf(" Status: %s, Tools: %d, Room: %s\n", online, toolCount, device.RoomID)) | ||
| } | ||
| sb.WriteString("\nCommands:\n• `!ai desktop tools` - List available MCP tools\n• `!ai desktop init` - Re-initialize MCP connection\n• `!ai desktop prefer <device_id>` - Set preferred device") | ||
| ce.Reply(sb.String()) | ||
| return | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concurrent map access requires synchronization.
Multiple accesses to loginMeta.DesktopDevices throughout this function are not synchronized. As noted in PR discussion, this map can be modified concurrently by IPC handlers (e.g., handleDesktopHello). Once a mutex is added to UserLoginMetadata, ensure this function acquires the lock before reading from the map.
🤖 Prompt for AI Agents
In `@pkg/connector/commands.go` around lines 526 - 551, This code reads
loginMeta.DesktopDevices and loginMeta.PreferredDesktopDeviceID without
synchronization; once you add a mutex to UserLoginMetadata, acquire the
appropriate lock (e.g., RLock/RUnlock for read-only access or Lock/Unlock if you
prefer) on loginMeta before any reads in the "!ai desktop list" branch (the
block that checks len(ce.Args) == 0 || ce.Args[0] == "list") and release it
after building the response so concurrent writers such as handleDesktopHello
cannot mutate DesktopDevices mid-read; ensure the lock scope covers all uses of
loginMeta.DesktopDevices, loginMeta.PreferredDesktopDeviceID, and related fields
in that block.
| go func() { | ||
| if err := client.initializeMCPConnection(ce.Ctx, device.DeviceID); err != nil { | ||
| client.sendSystemNotice(ce.Ctx, ce.Portal, fmt.Sprintf("MCP init failed: %v", err)) | ||
| } else { | ||
| toolCount := len(device.Tools) | ||
| client.sendSystemNotice(ce.Ctx, ce.Portal, fmt.Sprintf("MCP initialized! %d tools available", toolCount)) | ||
| } | ||
| }() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Context may be cancelled before async operation completes.
The goroutine uses ce.Ctx which may be tied to the command event lifecycle. If the context is cancelled after the handler returns, initializeMCPConnection and sendSystemNotice could fail unexpectedly.
Consider deriving a detached context or using the bridge's background context for long-running async operations.
🔧 Proposed fix
// Run async
go func() {
- if err := client.initializeMCPConnection(ce.Ctx, device.DeviceID); err != nil {
- client.sendSystemNotice(ce.Ctx, ce.Portal, fmt.Sprintf("MCP init failed: %v", err))
+ ctx := client.UserLogin.Bridge.BackgroundCtx()
+ if err := client.initializeMCPConnection(ctx, device.DeviceID); err != nil {
+ client.sendSystemNotice(ctx, ce.Portal, fmt.Sprintf("MCP init failed: %v", err))
} else {
- toolCount := len(device.Tools)
- client.sendSystemNotice(ce.Ctx, ce.Portal, fmt.Sprintf("MCP initialized! %d tools available", toolCount))
+ toolCount := len(client.mcpClient.GetDesktopTools())
+ client.sendSystemNotice(ctx, ce.Portal, fmt.Sprintf("MCP initialized! %d tools available", toolCount))
}
}()🤖 Prompt for AI Agents
In `@pkg/connector/commands.go` around lines 610 - 617, The goroutine uses the
request-scoped context ce.Ctx which may be cancelled when the handler returns;
update the async work to use a detached/background context (e.g.,
context.Background() or the connector/bridge long-lived context if available) so
initializeMCPConnection and sendSystemNotice run reliably; modify the anonymous
goroutine to create/use that background context instead of ce.Ctx and pass the
same detached ctx into client.initializeMCPConnection(...) and
client.sendSystemNotice(...), while still using device.DeviceID and ce.Portal as
before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🤖 Fix all issues with AI agents
In `@pkg/connector/client.go`:
- Around line 1519-1549: refreshMCPTools mutates device.Tools and
device.LastSeen without synchronization; wrap the mutation (device.Tools =
tools; device.LastSeen = ...) and the subsequent Save call in the same mutex
that guards access to DesktopDevices (the same lock used by
GetDesktopTools/other DesktopDevices readers) so readers can't see a
torn/partial update—acquire the mutex before updating, assign tools and LastSeen
while holding it, release the mutex and then proceed with UserLogin.Save (or if
Save must be atomic with the update, call Save while still holding the mutex) to
eliminate the race.
In `@pkg/connector/connector.go`:
- Around line 610-619: The goroutine that calls
client.refreshMCPTools(refreshCtx, ipc.DeviceID) should derive its timeout from
the bridge/shutdown-aware context instead of context.Background(); replace
context.WithTimeout(context.Background(), 30*time.Second) with
context.WithTimeout(bridgeCtx, 30*time.Second) (or the existing
bridge/shutdown-aware context variable used elsewhere like in
handleDesktopHello) so the refreshMCPTools call is cancelled when the bridge
shuts down.
- Around line 501-519: The DesktopDevices map in loginMetadata() is mutated
without synchronization causing a data race; add a sync.Mutex field (e.g.,
deviceMu sync.Mutex `json:"-"`) to the UserLoginMetadata struct (in metadata.go)
and wrap reads/writes to meta.DesktopDevices and meta.PreferredDesktopDeviceID
in deviceMu.Lock()/deviceMu.Unlock() around the code that creates/assigns the
DesktopDeviceInfo and sets PreferredDesktopDeviceID so concurrent hello events
are serialized.
- Around line 529-538: The goroutine uses context.Background() which detaches it
from the bridge lifecycle; change the closure to use the bridge's context (e.g.,
bridgeCtx or the existing ctx used for connector lifecycle) instead of
context.Background() so that client.initializeMCPConnection(initCtx,
ipc.DeviceID) is cancelled when the bridge shuts down; capture that bridge
context in the goroutine, create the timeout with context.WithTimeout(bridgeCtx,
60*time.Second), and keep the call to client.initializeMCPConnection(initCtx,
ipc.DeviceID) and the error logging unchanged.
In `@pkg/connector/mcp_client.go`:
- Around line 90-97: The select uses time.After which leaks timers; replace
time.After(timeout) with a time.NewTimer(timeout) stored in a local variable
(e.g., t := time.NewTimer(timeout)) and use t.C in the select, then ensure you
call t.Stop() when returning from the other cases to avoid the leak; adjust the
select in the function using respChan, ctx, and timeout (the block that
currently returns resp, nil / timeout error / ctx.Err()) so that the timer is
stopped when respChan or ctx finishes and only allowed to expire for the timeout
branch.
In `@pkg/connector/metadata.go`:
- Around line 85-103: The DesktopDevices map on UserLoginMetadata is accessed
concurrently (written in handleDesktopHello and read in
GetPreferredDevice/GetDesktopTools) and must be synchronized to avoid panics;
add a devicesMu sync.RWMutex field to UserLoginMetadata (with `json:"-"`)
protecting DesktopDevices and PreferredDesktopDeviceID, update writes (e.g., in
handleDesktopHello) to use devicesMu.Lock/Unlock and reads (e.g., in
GetPreferredDevice, GetDesktopTools) to use devicesMu.RLock/RUnlock, and ensure
any code that iterates or updates MCPTool entries uses the appropriate
lock/unlock around the map access.
🧹 Nitpick comments (3)
pkg/connector/events.go (1)
744-762:MCPPayloadandMCPJSONRPCBaseare structurally identical.Both structs have the exact same fields with the same JSON tags. Consider consolidating into a single type or using a type alias to reduce duplication and maintenance burden.
-// MCPPayload is the payload for MCP request/response/notification IPC messages -type MCPPayload struct { - JSONRPC string `json:"jsonrpc"` - ID any `json:"id,omitempty"` // string, number, or null - Method string `json:"method,omitempty"` - Params map[string]any `json:"params,omitempty"` - Result any `json:"result,omitempty"` - Error *MCPRPCError `json:"error,omitempty"` -} - -// MCPJSONRPCBase represents a generic JSON-RPC 2.0 message -type MCPJSONRPCBase struct { - JSONRPC string `json:"jsonrpc"` - ID any `json:"id,omitempty"` // string, number, or null - Method string `json:"method,omitempty"` - Params map[string]any `json:"params,omitempty"` - Result any `json:"result,omitempty"` - Error *MCPRPCError `json:"error,omitempty"` -} +// MCPJSONRPCBase represents a generic JSON-RPC 2.0 message +type MCPJSONRPCBase struct { + JSONRPC string `json:"jsonrpc"` + ID any `json:"id,omitempty"` // string, number, or null + Method string `json:"method,omitempty"` + Params map[string]any `json:"params,omitempty"` + Result any `json:"result,omitempty"` + Error *MCPRPCError `json:"error,omitempty"` +} + +// MCPPayload is an alias for MCPJSONRPCBase used in IPC message payloads +type MCPPayload = MCPJSONRPCBasepkg/connector/mcp_client.go (1)
41-73: Add input validation forCallparameters.The method doesn't validate that
roomID,deviceID, ormethodare non-empty before proceeding. Empty values could cause confusing errors downstream or send malformed IPC messages.Suggested validation
func (c *MCPMatrixClient) Call(ctx context.Context, roomID id.RoomID, deviceID string, method string, params map[string]any) (*MCPJSONRPCBase, error) { + if roomID == "" { + return nil, fmt.Errorf("roomID is required") + } + if deviceID == "" { + return nil, fmt.Errorf("deviceID is required") + } + if method == "" { + return nil, fmt.Errorf("method is required") + } + requestID := NewCallID()pkg/connector/chat.go (1)
501-540: Redundant portal fetch can be simplified.Lines 506 and 522 both call
GetPortalByKeywith the same key. When the portal exists but has no MXID, you could reuseexistingPortalinstead of fetching again.♻️ Suggested simplification
func (oc *AIClient) createMCPRelayRoom(ctx context.Context) (*bridgev2.CreateChatResponse, error) { portalKey := portalKeyForMCPRelay(oc.UserLogin.ID) - // Check if portal already exists - existingPortal, err := oc.UserLogin.Bridge.GetPortalByKey(ctx, portalKey) - if err == nil && existingPortal != nil && existingPortal.MXID != "" { + // Get or create portal + portal, err := oc.UserLogin.Bridge.GetPortalByKey(ctx, portalKey) + if err != nil { + return nil, fmt.Errorf("failed to get/create portal: %w", err) + } + + // Return existing if already has Matrix room + if portal.MXID != "" { oc.log.Info(). Stringer("portal_key", portalKey). - Stringer("room_id", existingPortal.MXID). + Stringer("room_id", portal.MXID). Msg("MCP relay room already exists") return &bridgev2.CreateChatResponse{ - Portal: existingPortal, + Portal: portal, PortalKey: portalKey, PortalInfo: oc.buildMCPRelayChatInfo(), }, nil } - // Create new MCP relay portal + // Configure new MCP relay portal oc.log.Info().Stringer("portal_key", portalKey).Msg("Creating new MCP relay room") - - portal, err := oc.UserLogin.Bridge.GetPortalByKey(ctx, portalKey) - if err != nil { - return nil, fmt.Errorf("failed to get/create portal: %w", err) - } - chatInfo := oc.buildMCPRelayChatInfo()
| // refreshMCPTools refreshes the cached tools from a Desktop device | ||
| func (oc *AIClient) refreshMCPTools(ctx context.Context, deviceID string) error { | ||
| meta := loginMetadata(oc.UserLogin) | ||
| device, ok := meta.DesktopDevices[deviceID] | ||
| if !ok { | ||
| return fmt.Errorf("device %s not found", deviceID) | ||
| } | ||
|
|
||
| oc.log.Debug().Str("device_id", deviceID).Msg("Refreshing MCP tools") | ||
|
|
||
| tools, err := oc.mcpClient.ListTools(ctx, device.RoomID, deviceID) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to list tools: %w", err) | ||
| } | ||
|
|
||
| // Update cached tools | ||
| device.Tools = tools | ||
| device.LastSeen = time.Now().Unix() | ||
|
|
||
| // Save updated metadata | ||
| if err := oc.UserLogin.Save(ctx); err != nil { | ||
| oc.log.Warn().Err(err).Msg("Failed to save login metadata after tools refresh") | ||
| } | ||
|
|
||
| oc.log.Info(). | ||
| Str("device_id", deviceID). | ||
| Int("tool_count", len(tools)). | ||
| Msg("Refreshed MCP tools from Desktop") | ||
|
|
||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential race: device.Tools modified without synchronization.
refreshMCPTools directly mutates device.Tools (line 1535) while other goroutines may be reading it via GetDesktopTools(). This is part of the same race condition affecting DesktopDevices. Ensure the mutex covers this mutation as well.
+ // Lock before modifying device state
+ // (assuming devicesMu is added to UserLoginMetadata)
+ meta.devicesMu.Lock()
device.Tools = tools
device.LastSeen = time.Now().Unix()
+ meta.devicesMu.Unlock()🤖 Prompt for AI Agents
In `@pkg/connector/client.go` around lines 1519 - 1549, refreshMCPTools mutates
device.Tools and device.LastSeen without synchronization; wrap the mutation
(device.Tools = tools; device.LastSeen = ...) and the subsequent Save call in
the same mutex that guards access to DesktopDevices (the same lock used by
GetDesktopTools/other DesktopDevices readers) so readers can't see a
torn/partial update—acquire the mutex before updating, assign tools and LastSeen
while holding it, release the mutex and then proceed with UserLogin.Save (or if
Save must be atomic with the update, call Save while still holding the mutex) to
eliminate the race.
| // Store the device info | ||
| meta := loginMetadata(login) | ||
| if meta.DesktopDevices == nil { | ||
| meta.DesktopDevices = make(map[string]*DesktopDeviceInfo) | ||
| } | ||
|
|
||
| meta.DesktopDevices[ipc.DeviceID] = &DesktopDeviceInfo{ | ||
| DeviceID: ipc.DeviceID, | ||
| DeviceName: payload.DeviceName, | ||
| RoomID: evt.RoomID, | ||
| LastSeen: time.Now().Unix(), | ||
| AppVersion: payload.AppVersion, | ||
| Online: true, | ||
| } | ||
|
|
||
| // Set as preferred if no preference exists | ||
| if meta.PreferredDesktopDeviceID == "" { | ||
| meta.PreferredDesktopDeviceID = ipc.DeviceID | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Race condition on DesktopDevices map.
The DesktopDevices map is accessed and modified without synchronization. If multiple desktop hello events arrive concurrently (e.g., from different devices), this can cause a data race. The PR objectives explicitly flagged this as a critical issue.
🔒 Suggested fix: Add mutex protection
Add a mutex to UserLoginMetadata (in metadata.go) and use it here:
+ meta.deviceMu.Lock()
if meta.DesktopDevices == nil {
meta.DesktopDevices = make(map[string]*DesktopDeviceInfo)
}
meta.DesktopDevices[ipc.DeviceID] = &DesktopDeviceInfo{
DeviceID: ipc.DeviceID,
DeviceName: payload.DeviceName,
RoomID: evt.RoomID,
LastSeen: time.Now().Unix(),
AppVersion: payload.AppVersion,
Online: true,
}
// Set as preferred if no preference exists
if meta.PreferredDesktopDeviceID == "" {
meta.PreferredDesktopDeviceID = ipc.DeviceID
}
+ meta.deviceMu.Unlock()The mutex should be added to UserLoginMetadata in metadata.go:
type UserLoginMetadata struct {
// ... existing fields
deviceMu sync.Mutex `json:"-"` // Protects DesktopDevices
}🤖 Prompt for AI Agents
In `@pkg/connector/connector.go` around lines 501 - 519, The DesktopDevices map in
loginMetadata() is mutated without synchronization causing a data race; add a
sync.Mutex field (e.g., deviceMu sync.Mutex `json:"-"`) to the UserLoginMetadata
struct (in metadata.go) and wrap reads/writes to meta.DesktopDevices and
meta.PreferredDesktopDeviceID in deviceMu.Lock()/deviceMu.Unlock() around the
code that creates/assigns the DesktopDeviceInfo and sets
PreferredDesktopDeviceID so concurrent hello events are serialized.
| // Initialize MCP connection asynchronously | ||
| go func() { | ||
| initCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) | ||
| defer cancel() | ||
|
|
||
| if err := client.initializeMCPConnection(initCtx, ipc.DeviceID); err != nil { | ||
| log.Err(err).Msg("Failed to initialize MCP connection") | ||
| } | ||
| }() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use bridge context instead of context.Background() for goroutine.
Using context.Background() disconnects the goroutine from the bridge's lifecycle. If the bridge shuts down, this goroutine won't be cancelled and could leak or cause issues. The PR objectives flagged this as a critical issue.
🛠️ Suggested fix
// Initialize MCP connection asynchronously
go func() {
- initCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
+ initCtx, cancel := context.WithTimeout(oc.br.BackgroundCtx, 60*time.Second)
defer cancel()
if err := client.initializeMCPConnection(initCtx, ipc.DeviceID); err != nil {
log.Err(err).Msg("Failed to initialize MCP connection")
}
}()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Initialize MCP connection asynchronously | |
| go func() { | |
| initCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) | |
| defer cancel() | |
| if err := client.initializeMCPConnection(initCtx, ipc.DeviceID); err != nil { | |
| log.Err(err).Msg("Failed to initialize MCP connection") | |
| } | |
| }() | |
| } | |
| // Initialize MCP connection asynchronously | |
| go func() { | |
| initCtx, cancel := context.WithTimeout(oc.br.BackgroundCtx, 60*time.Second) | |
| defer cancel() | |
| if err := client.initializeMCPConnection(initCtx, ipc.DeviceID); err != nil { | |
| log.Err(err).Msg("Failed to initialize MCP connection") | |
| } | |
| }() | |
| } |
🤖 Prompt for AI Agents
In `@pkg/connector/connector.go` around lines 529 - 538, The goroutine uses
context.Background() which detaches it from the bridge lifecycle; change the
closure to use the bridge's context (e.g., bridgeCtx or the existing ctx used
for connector lifecycle) instead of context.Background() so that
client.initializeMCPConnection(initCtx, ipc.DeviceID) is cancelled when the
bridge shuts down; capture that bridge context in the goroutine, create the
timeout with context.WithTimeout(bridgeCtx, 60*time.Second), and keep the call
to client.initializeMCPConnection(initCtx, ipc.DeviceID) and the error logging
unchanged.
| // Refresh tools asynchronously | ||
| go func() { | ||
| refreshCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | ||
| defer cancel() | ||
|
|
||
| if err := client.refreshMCPTools(refreshCtx, ipc.DeviceID); err != nil { | ||
| log.Err(err).Msg("Failed to refresh MCP tools") | ||
| } | ||
| }() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use bridge context instead of context.Background() in notification handler goroutine.
Same issue as in handleDesktopHello - the goroutine uses context.Background() which won't be cancelled on bridge shutdown.
🛠️ Suggested fix
// Refresh tools asynchronously
go func() {
- refreshCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ refreshCtx, cancel := context.WithTimeout(oc.br.BackgroundCtx, 30*time.Second)
defer cancel()
if err := client.refreshMCPTools(refreshCtx, ipc.DeviceID); err != nil {
log.Err(err).Msg("Failed to refresh MCP tools")
}
}()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Refresh tools asynchronously | |
| go func() { | |
| refreshCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | |
| defer cancel() | |
| if err := client.refreshMCPTools(refreshCtx, ipc.DeviceID); err != nil { | |
| log.Err(err).Msg("Failed to refresh MCP tools") | |
| } | |
| }() | |
| } | |
| // Refresh tools asynchronously | |
| go func() { | |
| refreshCtx, cancel := context.WithTimeout(oc.br.BackgroundCtx, 30*time.Second) | |
| defer cancel() | |
| if err := client.refreshMCPTools(refreshCtx, ipc.DeviceID); err != nil { | |
| log.Err(err).Msg("Failed to refresh MCP tools") | |
| } | |
| }() | |
| } |
🤖 Prompt for AI Agents
In `@pkg/connector/connector.go` around lines 610 - 619, The goroutine that calls
client.refreshMCPTools(refreshCtx, ipc.DeviceID) should derive its timeout from
the bridge/shutdown-aware context instead of context.Background(); replace
context.WithTimeout(context.Background(), 30*time.Second) with
context.WithTimeout(bridgeCtx, 30*time.Second) (or the existing
bridge/shutdown-aware context variable used elsewhere like in
handleDesktopHello) so the refreshMCPTools call is cancelled when the bridge
shuts down.
| select { | ||
| case resp := <-respChan: | ||
| return resp, nil | ||
| case <-time.After(timeout): | ||
| return nil, fmt.Errorf("MCP request timed out after %v", timeout) | ||
| case <-ctx.Done(): | ||
| return nil, ctx.Err() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Timer leak: time.After in select creates timers that won't be GC'd until expiration.
Each call to time.After allocates a timer that persists until it fires, even if the select completes via another case. Under load with many timeouts, this leaks memory. Use time.NewTimer with explicit Stop().
Recommended fix
// Wait for response with timeout
timeout := MCPRequestTimeout
if deadline, ok := ctx.Deadline(); ok {
remaining := time.Until(deadline)
if remaining < timeout {
timeout = remaining
}
}
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+
select {
case resp := <-respChan:
return resp, nil
- case <-time.After(timeout):
+ case <-timer.C:
return nil, fmt.Errorf("MCP request timed out after %v", timeout)
case <-ctx.Done():
return nil, ctx.Err()
}🤖 Prompt for AI Agents
In `@pkg/connector/mcp_client.go` around lines 90 - 97, The select uses time.After
which leaks timers; replace time.After(timeout) with a time.NewTimer(timeout)
stored in a local variable (e.g., t := time.NewTimer(timeout)) and use t.C in
the select, then ensure you call t.Stop() when returning from the other cases to
avoid the leak; adjust the select in the function using respChan, ctx, and
timeout (the block that currently returns resp, nil / timeout error / ctx.Err())
so that the timer is stopped when respChan or ctx finishes and only allowed to
expire for the timeout branch.
|
|
||
| // Desktop IPC devices - maps device_id to device info | ||
| // Each device has its own IPC room for MCP communication | ||
| DesktopDevices map[string]*DesktopDeviceInfo `json:"desktop_devices,omitempty"` | ||
|
|
||
| // Preferred desktop device for MCP tool calls | ||
| PreferredDesktopDeviceID string `json:"preferred_desktop_device_id,omitempty"` | ||
| } | ||
|
|
||
| // DesktopDeviceInfo stores information about a connected Beeper Desktop device | ||
| type DesktopDeviceInfo struct { | ||
| DeviceID string `json:"device_id"` | ||
| DeviceName string `json:"device_name,omitempty"` | ||
| RoomID id.RoomID `json:"room_id"` | ||
| LastSeen int64 `json:"last_seen"` // Unix timestamp | ||
| AppVersion string `json:"app_version,omitempty"` | ||
| Tools []MCPTool `json:"tools,omitempty"` // Cached tools from this device | ||
| Online bool `json:"online,omitempty"` // Whether device appears to be online | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Race condition: DesktopDevices map requires synchronization.
As noted in the PR discussion, DesktopDevices is accessed concurrently from IPC handlers (writes in handleDesktopHello) and from GetPreferredDevice/GetDesktopTools (reads in mcp_client.go). Go maps panic on concurrent read/write. Add a mutex to UserLoginMetadata or use a thread-safe access pattern.
Suggested approach
// Add to UserLoginMetadata or a wrapper:
type UserLoginMetadata struct {
// ... existing fields ...
// Desktop IPC devices - protected by devicesMu
DesktopDevices map[string]*DesktopDeviceInfo `json:"desktop_devices,omitempty"`
PreferredDesktopDeviceID string `json:"preferred_desktop_device_id,omitempty"`
// devicesMu protects DesktopDevices and PreferredDesktopDeviceID
// Note: sync.RWMutex doesn't serialize to JSON
devicesMu sync.RWMutex `json:"-"`
}Then use RLock/RUnlock for reads and Lock/Unlock for writes throughout the codebase.
🧰 Tools
🪛 GitHub Actions: Go
[error] 93-93: Pre-commit hook go-imports-repo failed. Hooks modified files: pkg/connector/metadata.go. Please re-commit after reviewing changes.
🤖 Prompt for AI Agents
In `@pkg/connector/metadata.go` around lines 85 - 103, The DesktopDevices map on
UserLoginMetadata is accessed concurrently (written in handleDesktopHello and
read in GetPreferredDevice/GetDesktopTools) and must be synchronized to avoid
panics; add a devicesMu sync.RWMutex field to UserLoginMetadata (with
`json:"-"`) protecting DesktopDevices and PreferredDesktopDeviceID, update
writes (e.g., in handleDesktopHello) to use devicesMu.Lock/Unlock and reads
(e.g., in GetPreferredDevice, GetDesktopTools) to use devicesMu.RLock/RUnlock,
and ensure any code that iterates or updates MCPTool entries uses the
appropriate lock/unlock around the map access.
No description provided.